package cn.cyk.kafkatoolgui.controller;

import javafx.fxml.FXML;
import javafx.scene.control.Alert;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.input.KeyCode;
import javafx.scene.input.KeyEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * kafka生产者控制器类
 *
 * @author : YongKun CAO
 * @date : 2021/8/18 9:46 下午
 **/
public class KafkaProducerController {

    @FXML
    private TextField addr;

    @FXML
    private TextField topic;

    @FXML
    private TextArea content;

    @FXML
    protected void onSendButtonClick() {
        String addrText = addr.getText();
        if (addr.getText().equals("")) {
            alert("Kafka服务链接IP和端口为空，请填写。");
        } else if (topic.getText().equals("")) {
            alert("Kafka主题名为空，请填写。");
        } else if (content.getText().equals("")) {
            alert("Kafka消息内容为空，请填写。");
        } else {
            kafkaProducerHandle(
                    addr.getText().trim(),
                    topic.getText().trim(),
                    content.getText().trim()
            );
        }
    }

    /**
     * 设置回车快捷键
     *
     * @param key
     */
    @FXML
    public void keyPressedHandle(KeyEvent key) {
        if (key.getCode() == KeyCode.ENTER) {
            onSendButtonClick();
        }
    }

    /**
     * 消息提示框
     *
     * @param test
     */
    public void alert(String test) {
        Alert alert = new Alert(Alert.AlertType.WARNING);
        alert.titleProperty().set("警告");
        alert.headerTextProperty().set(test);
        alert.showAndWait();
    }

    public void kafkaProducerHandle(String addr, String topic, String content) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addr);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.name");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);

        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
